package drds.plus.sql_process.optimizer.execute_plan_optimizer;

import drds.plus.common.jdbc.Parameters;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.*;
import drds.plus.sql_process.utils.OptimizerUtils;

import java.util.Map;

/**
 * streaming模式处理
 *
 * <pre>
 * 几种情况需要使用streaming
 * 1. 当前执行计划节点，包括子节点在内，不存在where条件，比如select * from xxx. 选择streaming模式
 * 2. 针对merge/join/subquery节点，如果存在limit参数
 *    a. 针对merge节点，因为limit参数可下推，需要判断下limit from+to的参数值是否超过阀值(下推到子节点后是0->from+to)
 *    b. 针对join和subquery，目前未做limit参数下推，所以直接选择streaming模式
 *    c. 针对query节点，无子节点并且父节点不为streaming模式，当前节点不选择streaming.
 * </pre>
 */
public class StreamingOptimizer implements ExecutePlanOptimizer {

    /**
     * 没有聚合操作，没有hint chars，没有条件
     */
    private static boolean isSimpleQuery(Query query) {
        if (query.getSql() != null) { // 如果是sql单库下推，不管
            return false;
        }

        Comparable from = query.getLimitFrom();
        Comparable to = query.getLimitTo();
        // 不存在limit
        return from == null && to == null && OptimizerUtils.isNoFilter(query) && !query.isExistAggregate();
    }

    private static boolean isNeedStreaming(Query query, Map<String, Object> extraCmd) {
        // 存在limit条件
        Comparable from = query.getLimitFrom();
        Comparable to = query.getLimitTo();
        return from != null || to != null;
    }

    public ExecutePlan optimize(ExecutePlan executePlan, Parameters parametersettings, Map<String, Object> extraCmd) {
        boolean parentStreaming = false;
        String forceStreamingValue = (String) (extraCmd.get(ConnectionProperties.force_streaming));
        if (forceStreamingValue != null) {
            boolean forceStreaming = Boolean.parseBoolean(forceStreamingValue);
            if (forceStreaming == false) { // 强制关闭，忽略此优化
                return executePlan;
            }
            parentStreaming = forceStreaming;
        }

        if (executePlan instanceof Query) {
            this.findQueryOptimizeStreaming(executePlan, parentStreaming, extraCmd);
        }

        return executePlan;
    }

    /**
     * @param parentStreaming 只要是QueryTree,都进行传递
     *                        递归设置streaming，比如父节点parentStreaming为true时，当前节点包括子节点都应采用streaming模式来保证
     */
    private void findQueryOptimizeStreaming(ExecutePlan executePlan, boolean parentStreaming, Map<String, Object> extraCmd) {
        if (!(executePlan instanceof Query)) {// [1]
            return;
        }

        // 当前节点如果无任何条件,并且无limit
        if (isSimpleQuery((Query) executePlan)) {// [2]
            parentStreaming = true;
        }

        boolean streaming = (parentStreaming || isNeedStreaming((Query) executePlan, extraCmd));// [3]
        if (executePlan instanceof MergeQuery) {
            for (ExecutePlan subExecutePlan : ((MergeQuery) executePlan).getExecutePlanList()) {
                this.findQueryOptimizeStreaming(subExecutePlan, streaming, extraCmd);
            }
        } else if (executePlan instanceof Join) {
            this.findQueryOptimizeStreaming(((Join) executePlan).getLeftNode(), streaming, extraCmd);
            if (((Join) executePlan).getJoinStrategy() == JoinStrategy.sort_merge_join) {
                // 针对sort sort join，需要考虑右表，其余情况为in模式和block模式，不需要理会streaming
                this.findQueryOptimizeStreaming(((Join) executePlan).getRightNode(), streaming, extraCmd);
            }
        } else if (executePlan instanceof QueryWithIndex) {
            if (((QueryWithIndex) executePlan).getSubQuery() != null) {
                this.findQueryOptimizeStreaming(((QueryWithIndex) executePlan).getSubQuery(), streaming, extraCmd);
            }
        }

        // 针对可下推的case，不做单独判断，执行器执行的时候如果发现整个节点可下推，不会理会子节点的streaming属性
        executePlan.setStreaming(parentStreaming);
    }

}
